5ae2c7cbc66aa187b063b89535ccbe0fdded4673,cdap-app-templates/cdap-etl/cdap-data-pipeline/src/test/java/co/cask/cdap/datapipeline/DataPipelineTest.java,DataPipelineTest,testInnerJoin,#,734
Before Change
Assert.assertEquals(expected, actual);
validateMetric(2, appId, "testJoiner.records.out");
validateMetric(2, appId, "sink1.records.in");
}
@Test
After Change
String input2Name = "source2InnerJoinInput-" + engine;
String input3Name = "source3InnerJoinInput-" + engine;
String outputName = "innerJoinOutput-" + engine;
String joinerName = "innerJoiner-" + engine;
String sinkName = "innerJoinSink-" + engine;
ETLBatchConfig etlConfig = ETLBatchConfig.builder("* * * * *")
.addStage(new ETLStage("source1", MockSource.getPlugin(input1Name)))
.addStage(new ETLStage("source2", MockSource.getPlugin(input2Name)))
.addStage(new ETLStage("source3", MockSource.getPlugin(input3Name)))
.addStage(new ETLStage("t1", FieldsPrefixTransform.getPlugin("", inputSchema1.toString())))
.addStage(new ETLStage("t2", FieldsPrefixTransform.getPlugin("", inputSchema2.toString())))
.addStage(new ETLStage("t3", FieldsPrefixTransform.getPlugin("", inputSchema3.toString())))
.addStage(new ETLStage(joinerName, MockJoiner.getPlugin("t1.customer_id=t2.cust_id=t3.c_id&" +
"t1.customer_name=t2.cust_name=t3.c_name",
"t1,t2,t3", "")))
.addStage(new ETLStage(sinkName, MockSink.getPlugin(outputName)))
.addConnection("source1", "t1")
.addConnection("source2", "t2")
.addConnection("source3", "t3")
.addConnection("t1", joinerName)
.addConnection("t2", joinerName)
.addConnection("t3", joinerName)
.addConnection(joinerName, sinkName)
.setEngine(engine)
.build();
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(APP_ARTIFACT, etlConfig);
Id.Application appId = Id.Application.from(Id.Namespace.DEFAULT, "JoinerApp");
ApplicationManager appManager = deployApplication(appId, appRequest);
Schema outSchema = Schema.recordOf(
"join.output",
Schema.Field.of("customer_id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("customer_name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("item_id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("item_price", Schema.of(Schema.Type.LONG)),
Schema.Field.of("cust_id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("cust_name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("t_id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("c_id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("c_name", Schema.of(Schema.Type.STRING))
);
StructuredRecord recordSamuel = StructuredRecord.builder(inputSchema1).set("customer_id", "1")
.set("customer_name", "samuel").build();
StructuredRecord recordBob = StructuredRecord.builder(inputSchema1).set("customer_id", "2")
.set("customer_name", "bob").build();
StructuredRecord recordJane = StructuredRecord.builder(inputSchema1).set("customer_id", "3")
.set("customer_name", "jane").build();
StructuredRecord recordCar = StructuredRecord.builder(inputSchema2).set("item_id", "11").set("item_price", 10000L)
.set("cust_id", "1").set("cust_name", "samuel").build();
StructuredRecord recordBike = StructuredRecord.builder(inputSchema2).set("item_id", "22").set("item_price", 100L)
.set("cust_id", "3").set("cust_name", "jane").build();
StructuredRecord recordTrasCar = StructuredRecord.builder(inputSchema3).set("t_id", "1").set("c_id", "1")
.set("c_name", "samuel").build();
StructuredRecord recordTrasBike = StructuredRecord.builder(inputSchema3).set("t_id", "2").set("c_id", "3")
.set("c_name", "jane").build();
// write one record to each source
DataSetManager<Table> inputManager = getDataset(Id.Namespace.DEFAULT, input1Name);
MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob, recordJane));
inputManager = getDataset(Id.Namespace.DEFAULT, input2Name);
MockSource.writeInput(inputManager, ImmutableList.of(recordCar, recordBike));
inputManager = getDataset(Id.Namespace.DEFAULT, input3Name);
MockSource.writeInput(inputManager, ImmutableList.of(recordTrasCar, recordTrasBike));
WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
workflowManager.start();
workflowManager.waitForFinish(5, TimeUnit.MINUTES);
StructuredRecord joinRecordSamuel = StructuredRecord.builder(outSchema)
.set("customer_id", "1").set("customer_name", "samuel")
.set("item_id", "11").set("item_price", 10000L).set("cust_id", "1").set("cust_name", "samuel")
.set("t_id", "1").set("c_id", "1").set("c_name", "samuel").build();
StructuredRecord joinRecordJane = StructuredRecord.builder(outSchema)
.set("customer_id", "3").set("customer_name", "jane")
.set("item_id", "22").set("item_price", 100L).set("cust_id", "3").set("cust_name", "jane")
.set("t_id", "2").set("c_id", "3").set("c_name", "jane").build();
DataSetManager<Table> sinkManager = getDataset(outputName);
Set<StructuredRecord> expected = ImmutableSet.of(joinRecordSamuel, joinRecordJane);
Set<StructuredRecord> actual = Sets.newHashSet(MockSink.readOutput(sinkManager));
Assert.assertEquals(expected, actual);
validateMetric(2, appId, joinerName + ".records.out");
validateMetric(2, appId, sinkName + ".records.in");
}